package zookeeper

import (
	"errors"
	"gitee.com/dennis-kk/service-box-go/common"
	perror "gitee.com/dennis-kk/service-box-go/util/errors"
	"gitee.com/dennis-kk/service-box-go/util/service_infra"
	"gitee.com/dennis-kk/service-box-go/util/slog"
	"github.com/go-zookeeper/zk"
	"sync"
	"sync/atomic"
	"time"
)

var (
	defaultTimeout = 5 * time.Second //default timeout for connect zookeeper server
	once           sync.Once
)

func init() {
	once.Do(func() {
		service_infra.RegisterServiceInfra("zookeeper", makeZKServiceInfra)
	})
}

type zkClientStatus int

const (
	zkDisConnected zkClientStatus = iota
	zkConnected
)

// zkServiceInfra implement of service infra interface with zookeeper
type zkServiceInfra struct {
	opts      *service_infra.Options           //zookeeper options
	conn      *zk.Conn                         //zookeeper connection
	logger    *zkLogHelper                     //zk hook logger
	status    zkClientStatus                   // zookeeper client connect status
	zkWatcher <-chan zk.Event                  //zookeeper watcher event
	zkMainCh  chan *zkEventData                //zookeeper write by net tick read by main loop
	zkNetCh   chan *zkEventData                //write by main loop, read by net loop
	varietyEH service_infra.ServiceEventHandle //listener map, ket service path, value function
	closeSign chan struct{}
	wg        sync.WaitGroup
}

func makeZKServiceInfra() service_infra.IServiceInfra {
	return &zkServiceInfra{
		opts:      &service_infra.Options{},
		conn:      nil,
		logger:    makeZkLogHelper(0),
		status:    zkDisConnected,
		zkWatcher: nil,
		zkMainCh:  make(chan *zkEventData, 16),
		zkNetCh:   make(chan *zkEventData, 16),
		varietyEH: nil,
		closeSign: make(chan struct{}),
		wg:        sync.WaitGroup{},
	}
}

func (sf *zkServiceInfra) Init(opts ...service_infra.Option) error {
	for _, opt := range opts {
		opt(sf.opts)
	}
	sf.opts.PreProcess()
	return nil
}

//Start start zookeeper client, connect to zookeeper server
func (sf *zkServiceInfra) Start() error {

	var err error
	sf.conn, sf.zkWatcher, err = zk.Connect(sf.opts.Hosts, defaultTimeout, zk.WithLogger(sf.logger))
	if err != nil {
		slog.Error("[zookeeper] connect to zookeeper servers %v error %v !", sf.opts.Hosts, err)
		return err
	}

	t := time.NewTimer(defaultTimeout)
	run := true
	for run {
		select {
		case <-t.C:
			return errors.New("connect to server time out")
		default:
			status := sf.conn.State()
			if status == zk.StateConnected || status == zk.StateHasSession {
				run = false
			}
		}
	}

	atomic.StoreUint32(&sf.logger.open, 1)

	// 预先创建好前缀路径
	err = sf.createParentPath(sf.opts.Prefix)
	if err != nil {
		return err
	}
	sf.status = zkConnected

	sf.wg.Add(1)
	go sf.loop()
	return nil
}

func (sf *zkServiceInfra) Stop() error {
	if sf == nil {
		// has dis connected to server
		return nil
	}

	sf.closeSign <- struct{}{}
	close(sf.closeSign)

	sf.wg.Wait()
	sf.conn.Close()
	return nil
}

func (sf *zkServiceInfra) Tick() {
	for {
		select {
		case zkEvent := <-sf.zkMainCh:
			sf.dealLoopEvent(zkEvent)
		default:
			return
		}
	}
}

func (sf *zkServiceInfra) FindServiceAsync(svsName string) error {
	if !isZKConnValid(sf.conn) {
		slog.Error("[Zookeeper] invalid zookeeper connection state: %d ", sf.conn.State())
		return common.ErrServiceFinderConn
	}

	event := &zkEventData{
		eType: zkFindService,
		name:  svsName,
	}
	event.path = preProcessSvsPath(sf.opts.Prefix, svsName)

	//TODO get timer with object pool
	//阻塞发送消息，最多一秒
	timer := time.NewTimer(1 * time.Second)

	select {
	case sf.zkNetCh <- event:
		return nil
	case <-timer.C:
		return common.ErrFindServiceTimeout
	}
}

func (sf *zkServiceInfra) FindService(svsName string) (*service_infra.ServiceInfo, error) {
	if !isZKConnValid(sf.conn) {
		slog.Error("[Zookeeper] invalid zookeeper connection state: %d ", sf.conn.State())
		return nil, nil
	}
	//pre deal svsName
	svsName = preProcessSvsPath(sf.opts.Prefix, svsName)

	exists, _, err := sf.conn.Exists(svsName)
	if err != nil {
		return nil, err
	}

	if !exists {
		return nil, nil
	}

	children, _, err := sf.conn.Children(svsName)
	if err != nil {
		return nil, err
	}

	if len(children) == 0 {
		return nil, nil
	}

	info := &service_infra.ServiceInfo{
		Hosts: children,
	}

	return info, nil
}

func (sf *zkServiceInfra) RegisterService(svsName string, info *service_infra.ServiceInfo) error {
	if !isZKConnValid(sf.conn) {
		slog.Error("[Zookeeper] invalid service connect while service %s register ", svsName)
		return nil
	}

	if info == nil {
		//FIXME add common err and add log
		return nil
	}

	//pre deal svsName
	svsName = preProcessSvsPath(sf.opts.Prefix, svsName)
	//check node exits
	exists, _, err := sf.conn.Exists(svsName)
	if err != nil {
		sf.logger.Printf("register service %s error %v ", svsName, err)
		return err
	}
	//zk: ephemeral nodes may not have children
	//主节点是临时节点的话是不能创建子节点
	if exists == false {
		//TODO check path from cache
		for i, v := range svsName {
			//create parents path
			if v == '/' && i != 0 {
				if err = sf.tryCreateZKPath(svsName[:i]); err != nil {
					return err
				}
			}
		}
		_, err = sf.conn.Create(svsName, nil, 0, zk.WorldACL(zk.PermAll))
		if err != nil {
			return err
		}
	}

	// set service info
	for _, v := range info.Hosts {
		//service path
		path := svsName + "/" + v
		exists, _, err = sf.conn.Exists(path)
		if err != nil {
			slog.Error("[Zookeeper] %s check path exists error %v", path, err)
			return err
		}

		if exists {
			continue
		}
		_, err = sf.conn.Create(path, nil, zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
		if err != nil {
			slog.Error("[Zookeeper] create path %s error %v", path, err)
			return err
		}
	}

	return nil
}

func (sf *zkServiceInfra) UnRegisterService(svsName string, info *service_infra.ServiceInfo) error {
	if sf == nil || !isZKConnValid(sf.conn) {
		return perror.InvalidZKConn
	}
	if info == nil {
		//FIXME add common err and err log
		return nil
	}
	//pre deal service name path
	svsName = preProcessSvsPath(sf.opts.Prefix, svsName)

	for _, v := range info.Hosts {
		err := sf.conn.Delete(svsName+"/"+v, -1)
		if err != nil {
			return err
		}
	}
	return nil
}

func (sf *zkServiceInfra) AddListener(handle service_infra.ServiceEventHandle) error {
	sf.varietyEH = handle
	return nil
}

func (sf *zkServiceInfra) DelListener() error {
	if sf == nil {
		//TODO add common log
		return nil
	}
	sf.varietyEH = nil
	return nil
}

func (sf *zkServiceInfra) IsConnected() bool {
	return sf.status == zkConnected
}

func (sf *zkServiceInfra) loop() {
	var reason string

	defer func() {
		sf.logger.Printf("exit zkServiceInfra loop with %s", reason)
		sf.wg.Done()
	}()

	for {
		select {
		case zkEvent := <-sf.zkNetCh:
			if zkEvent == nil {
				reason = "zkNetCh has been closed"
				// Deal EOF
				return
			}
			sf.dealLoopEvent(zkEvent)
		case zkEvent, ok := <-sf.zkWatcher:
			if !ok {
				reason = "zkWatcher has been closed"
				sf.logger.Printf("zookeeper event channel has been closed !")
				return
			}

			if zkEvent.Err != nil {
				reason = zkEvent.Err.Error()
				sf.logger.Printf("get zookeeper error event %d , err: %q", zkEvent.State, zkEvent.Err.Error())
				return
			}

			switch zkEvent.State {
			case zk.StateExpired:
				reason = "receive StateExpired "
				sf.logger.Printf("zookeeper node %s expired %d !", zkEvent.Path, zkEvent.Type)
				return
			default:
				sf.dealZKEvent(zkEvent)
			}
		case <-sf.closeSign:
			reason = "closing signal"
			//优雅关闭
			return
		}
	}
}

func (sf *zkServiceInfra) createParentPath(path string) error {

	if path == "/" {
		return nil
	}

	//check node exits
	exists, _, err := sf.conn.Exists(path)
	if err != nil {
		sf.logger.Printf("register service %s error %v ", path, err)
		return err
	}
	//zk: ephemeral nodes may not have children
	//主节点是临时节点的话是不能创建子节点
	if exists == false {
		for i, v := range path {
			//create parents path
			if v == '/' && i != 0 {
				if err = sf.tryCreateZKPath(path[:i]); err != nil {
					return err
				}
			}
		}
		_, err = sf.conn.Create(path, nil, 0, zk.WorldACL(zk.PermAll))
		if err != nil {
			return err
		}
	}

	return nil
}

func (sf *zkServiceInfra) tryCreateZKPath(path string) error {
	exists, _, err := sf.conn.Exists(path)
	if err != nil {
		//TODO add error log
		return err
	}
	if exists {
		//path has create
		return nil
	}
	// try create zookeeper nodes
	_, err = sf.conn.Create(path, nil, zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
	if err != nil {
		//TODO add error log
		return err
	}
	return nil
}

func (sf *zkServiceInfra) dealZKEvent(event zk.Event) {
	switch event.Type {
	case zk.EventNodeCreated:
		//add new zookeeper node
		sf.onNodeCreated(event.Path)
	case zk.EventNodeDataChanged:
		//node data has changed
	case zk.EventNodeChildrenChanged:
		//node's children changed, This is only generated by watches on the child list of a node. These watches
		//are set using  get_children
		sf.onChildrenChanged(event.Path)
	case zk.EventNodeDeleted:
		//node has been deleted, generated by watches on nodes. These watches
		//are set using zoo_exists and zoo_get.
		sf.onNodeDeleted(event.Path)
	}
}

func (sf *zkServiceInfra) dealLoopEvent(event *zkEventData) {
	switch event.eType {
	case zkFindService:
		sf.doFindService(event)
	case zkRegisterService:
		sf.doRegisterService(event)
	case zkChangeHost:
		sf.doChangeService(event)
	case zkDeleteHost:
		sf.doDeleteService(event)
	default:
		slog.Warn("[Zookeeper] unsupported event type %d ", event.eType)
	}
}

func (sf *zkServiceInfra) doFindService(event *zkEventData) {
	//check exits
	isExists, _, _, err := sf.conn.ExistsW(event.path)
	if !isExists {
		return
	}

	event.eType = zkChangeHost

	if err != nil {
		event.err = err
		sf.zkMainCh <- event
		return
	}

	//not use zookeeper stat and event channel yet
	event.hosts, _, _, event.err = sf.conn.ChildrenW(event.path)
	if len(event.hosts) == 0 {
		return
	}
	sf.zkMainCh <- event
}

func (sf *zkServiceInfra) doRegisterService(event *zkEventData) {
	exists, _, err := sf.conn.Exists(event.path)
	if err != nil {
		//TODO add log
		return
	}
	if exists == false {
		//TODO check path from cache
		for i, v := range event.path {
			//create parents path
			if v == '/' && i != 0 {
				if err = sf.tryCreateZKPath(event.path[:i]); err != nil {
					return
				}
			}
		}
		_, err = sf.conn.Create(event.path, nil, zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
		if err != nil {
			return
		}
	}

	// set service info
	for _, v := range event.hosts {
		_, err = sf.conn.Create(event.path+"/"+v, nil, zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
		if err != nil {
			//TODO add err log
			return
		}
	}
}

func (sf *zkServiceInfra) doChangeService(event *zkEventData) {
	if sf.varietyEH != nil {
		if event.err != nil {
			sf.varietyEH(event.name, service_infra.ServiceChange, nil, event.err)
			return
		}

		info := &service_infra.ChangedInfo{
			Hosts: event.hosts,
		}
		sf.varietyEH(event.name, service_infra.ServiceChange, info, nil)
	}
}

func (sf *zkServiceInfra) doDeleteService(event *zkEventData) {
	if sf.varietyEH != nil {
		sf.varietyEH(event.name, service_infra.ServiceDelete, nil, event.err)
	}
}

func (sf *zkServiceInfra) onNodeCreated(path string) {
	//service's node has been created, add watcher to child
	event := &zkEventData{}
	event.name = getNameFromPath(path)
	event.hosts, _, _, event.err = sf.conn.ChildrenW(path)
	if event.err == nil && len(event.hosts) == 0 {
		return
	}
	sf.zkMainCh <- event
}

func (sf *zkServiceInfra) onChildrenChanged(path string) {
	//get value,and re-add watch
	event := &zkEventData{
		eType: zkChangeHost,
		name:  getNameFromPath(path),
		path:  path,
	}
	event.hosts, _, _, event.err = sf.conn.ChildrenW(path)
	sf.zkMainCh <- event
}

func (sf *zkServiceInfra) onNodeDeleted(path string) {
	//re-add exists watcher
	_, _, _, err := sf.conn.ExistsW(path)
	//construct node delete event
	event := &zkEventData{
		eType: zkDeleteHost,
		name:  getNameFromPath(path),
		path:  path,
		err:   err,
	}
	sf.zkMainCh <- event
}
